序
本文主要研究一下flink的BlobService
BlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
/** * A simple store and retrieve binary large objects (BLOBs). */public interface BlobService extends Closeable { /** * Returns a BLOB service for accessing permanent BLOBs. * * @return BLOB service */ PermanentBlobService getPermanentBlobService(); /** * Returns a BLOB service for accessing transient BLOBs. * * @return BLOB service */ TransientBlobService getTransientBlobService(); /** * Returns the port of the BLOB server that this BLOB service is working with. * * @return the port the blob server. */ int getPort();}
- BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobService
PermanentBlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
/** * A service to retrieve permanent binary large objects (BLOBs). * *These may include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's * JAR files or (parts of) an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor} * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}. */public interface PermanentBlobService extends Closeable { /** * Returns the path to a local copy of the file associated with the provided job ID and blob * key. * * @param jobId * ID of the job this blob belongs to * @param key * BLOB key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * if the BLOB does not exist; * @throws IOException * if any other error occurs when retrieving the file */ File getFile(JobID jobId, PermanentBlobKey key) throws IOException;}
- PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File
TransientBlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java
/** * A service to retrieve transient binary large objects (BLOBs) which are deleted on the * {@link BlobServer} when they are retrieved. * *These may include per-job BLOBs like files in the {@link * org.apache.flink.api.common.cache.DistributedCache}, for example. * *
Note: None of these BLOBs is highly available (HA). This case is covered by BLOBs in the * {@link PermanentBlobService}. * *
TODO: change API to not rely on local files but return {@link InputStream} objects */public interface TransientBlobService extends Closeable { // -------------------------------------------------------------------------------------------- // GET // -------------------------------------------------------------------------------------------- /** * Returns the path to a local copy of the (job-unrelated) file associated with the provided * blob key. * * @param key * blob key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * when the path does not exist; * @throws IOException * if any other error occurs when retrieving the file */ File getFile(TransientBlobKey key) throws IOException; /** * Returns the path to a local copy of the file associated with the provided job ID and blob * key. * * @param jobId * ID of the job this blob belongs to * @param key * blob key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * when the path does not exist; * @throws IOException * if any other error occurs when retrieving the file */ File getFile(JobID jobId, TransientBlobKey key) throws IOException; // -------------------------------------------------------------------------------------------- // PUT // -------------------------------------------------------------------------------------------- /** * Uploads the (job-unrelated) data of the given byte array to the BLOB server. * * @param value * the buffer to upload * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ TransientBlobKey putTransient(byte[] value) throws IOException; /** * Uploads the data of the given byte array for the given job to the BLOB server. * * @param jobId * the ID of the job the BLOB belongs to * @param value * the buffer to upload * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException; /** * Uploads the (job-unrelated) data from the given input stream to the BLOB server. * * @param inputStream * the input stream to read the data from * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while reading the data from the input stream or uploading the * data to the BLOB server */ TransientBlobKey putTransient(InputStream inputStream) throws IOException; /** * Uploads the data from the given input stream for the given job to the BLOB server. * * @param jobId * ID of the job this blob belongs to * @param inputStream * the input stream to read the data from * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while reading the data from the input stream or uploading the * data to the BLOB server */ TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException; // -------------------------------------------------------------------------------------------- // DELETE // -------------------------------------------------------------------------------------------- /** * Deletes the (job-unrelated) file associated with the provided blob key from the local cache. * * @param key * associated with the file to be deleted * * @return true if the given blob is successfully deleted or non-existing; * false otherwise */ boolean deleteFromCache(TransientBlobKey key); /** * Deletes the file associated with the provided job ID and blob key from the local cache. * * @param jobId * ID of the job this blob belongs to * @param key * associated with the file to be deleted * * @return true if the given blob is successfully deleted or non-existing; * false otherwise */ boolean deleteFromCache(JobID jobId, TransientBlobKey key);}
- TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法
BlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
/** * A BLOB key uniquely identifies a BLOB. */public abstract class BlobKey implements Serializable, Comparable{ private static final long serialVersionUID = 3847117712521785209L; /** Size of the internal BLOB key in bytes. */ public static final int SIZE = 20; /** The byte buffer storing the actual key data. */ private final byte[] key; /** * (Internal) BLOB type - to be reflected by the inheriting sub-class. */ private final BlobType type; /** * BLOB type, i.e. permanent or transient. */ enum BlobType { /** * Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly * available. */ PERMANENT_BLOB, /** * Indicates a transient BLOB whose lifecycle is managed by the user and which is not made * highly available. */ TRANSIENT_BLOB } /** * Random component of the key. */ private final AbstractID random; /** * Constructs a new BLOB key. * * @param type * whether the referenced BLOB is permanent or transient */ protected BlobKey(BlobType type) { this.type = checkNotNull(type); this.key = new byte[SIZE]; this.random = new AbstractID(); } /** * Constructs a new BLOB key from the given byte array. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data */ protected BlobKey(BlobType type, byte[] key) { if (key == null || key.length != SIZE) { throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes"); } this.type = checkNotNull(type); this.key = key; this.random = new AbstractID(); } /** * Constructs a new BLOB key from the given byte array. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * @param random * the random component of the key */ protected BlobKey(BlobType type, byte[] key, byte[] random) { if (key == null || key.length != SIZE) { throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes"); } this.type = checkNotNull(type); this.key = key; this.random = new AbstractID(random); } /** * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * * @return BlobKey subclass */ @VisibleForTesting static BlobKey createKey(BlobType type) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(); } else { return new TransientBlobKey(); } } /** * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * * @return BlobKey subclass */ static BlobKey createKey(BlobType type, byte[] key) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(key); } else { return new TransientBlobKey(key); } } /** * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * @param random * the random component of the key * * @return BlobKey subclass */ static BlobKey createKey(BlobType type, byte[] key, byte[] random) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(key, random); } else { return new TransientBlobKey(key, random); } } /** * Returns the hash component of this key. * * @return a 20 bit hash of the contents the key refers to */ @VisibleForTesting public byte[] getHash() { return key; } /** * Returns the (internal) BLOB type which is reflected by the inheriting sub-class. * * @return BLOB type, i.e. permanent or transient */ BlobType getType() { return type; } /** * Adds the BLOB key to the given {@link MessageDigest}. * * @param md * the message digest to add the BLOB key to */ public void addToMessageDigest(MessageDigest md) { md.update(this.key); } @Override public boolean equals(final Object obj) { if (!(obj instanceof BlobKey)) { return false; } final BlobKey bk = (BlobKey) obj; return Arrays.equals(this.key, bk.key) && this.type == bk.type && this.random.equals(bk.random); } @Override public int hashCode() { int result = Arrays.hashCode(this.key); result = 37 * result + this.type.hashCode(); result = 37 * result + this.random.hashCode(); return result; } @Override public String toString() { final String typeString; switch (this.type) { case TRANSIENT_BLOB: typeString = "t-"; break; case PERMANENT_BLOB: typeString = "p-"; break; default: // this actually never happens! throw new IllegalStateException("Invalid BLOB type"); } return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString(); } @Override public int compareTo(BlobKey o) { // compare the hashes first final byte[] aarr = this.key; final byte[] barr = o.key; final int len = Math.min(aarr.length, barr.length); for (int i = 0; i < len; ++i) { final int a = (aarr[i] & 0xff); final int b = (barr[i] & 0xff); if (a != b) { return a - b; } } if (aarr.length == barr.length) { // same hash contents - compare the BLOB types int typeCompare = this.type.compareTo(o.type); if (typeCompare == 0) { // same type - compare random components return this.random.compareTo(o.random); } else { return typeCompare; } } else { return aarr.length - barr.length; } } // -------------------------------------------------------------------------------------------- /** * Auxiliary method to read a BLOB key from an input stream. * * @param inputStream * the input stream to read the BLOB key from * @return the read BLOB key * @throws IOException * throw if an I/O error occurs while reading from the input stream */ static BlobKey readFromInputStream(InputStream inputStream) throws IOException { final byte[] key = new byte[BlobKey.SIZE]; final byte[] random = new byte[AbstractID.SIZE]; int bytesRead = 0; // read key while (bytesRead < key.length) { final int read = inputStream.read(key, bytesRead, key.length - bytesRead); if (read < 0) { throw new EOFException("Read an incomplete BLOB key"); } bytesRead += read; } // read BLOB type final BlobType blobType; { final int read = inputStream.read(); if (read < 0) { throw new EOFException("Read an incomplete BLOB type"); } else if (read == TRANSIENT_BLOB.ordinal()) { blobType = TRANSIENT_BLOB; } else if (read == PERMANENT_BLOB.ordinal()) { blobType = PERMANENT_BLOB; } else { throw new IOException("Invalid data received for the BLOB type: " + read); } } // read random component bytesRead = 0; while (bytesRead < AbstractID.SIZE) { final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead); if (read < 0) { throw new EOFException("Read an incomplete BLOB key"); } bytesRead += read; } return createKey(blobType, key, random); } /** * Auxiliary method to write this BLOB key to an output stream. * * @param outputStream * the output stream to write the BLOB key to * @throws IOException * thrown if an I/O error occurs while writing the BLOB key */ void writeToOutputStream(final OutputStream outputStream) throws IOException { outputStream.write(this.key); outputStream.write(this.type.ordinal()); outputStream.write(this.random.getBytes()); }}
- BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey
PermanentBlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
/** * BLOB key referencing permanent BLOB files. */public final class PermanentBlobKey extends BlobKey { /** * Constructs a new BLOB key. */ @VisibleForTesting public PermanentBlobKey() { super(BlobType.PERMANENT_BLOB); } /** * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data */ PermanentBlobKey(byte[] key) { super(BlobType.PERMANENT_BLOB, key); } /** * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data * @param random * the random component of the key */ PermanentBlobKey(byte[] key, byte[] random) { super(BlobType.PERMANENT_BLOB, key, random); }}
- PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB
TransientBlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
/** * BLOB key referencing transient BLOB files. */public final class TransientBlobKey extends BlobKey { /** * Constructs a new BLOB key. */ @VisibleForTesting public TransientBlobKey() { super(BlobType.TRANSIENT_BLOB); } /** * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data */ TransientBlobKey(byte[] key) { super(BlobType.TRANSIENT_BLOB, key); } /** * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data * @param random * the random component of the key */ TransientBlobKey(byte[] key, byte[] random) { super(BlobType.TRANSIENT_BLOB, key, random); }}
- TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB
AbstractID
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
/** * A statistically unique identification number. */@PublicEvolvingpublic class AbstractID implements Comparable, java.io.Serializable { private static final long serialVersionUID = 1L; private static final Random RND = new Random(); /** The size of a long in bytes. */ private static final int SIZE_OF_LONG = 8; /** The size of the ID in byte. */ public static final int SIZE = 2 * SIZE_OF_LONG; // ------------------------------------------------------------------------ /** The upper part of the actual ID. */ protected final long upperPart; /** The lower part of the actual ID. */ protected final long lowerPart; /** The memoized value returned by toString(). */ private transient String toString; // -------------------------------------------------------------------------------------------- /** * Constructs a new ID with a specific bytes value. */ public AbstractID(byte[] bytes) { if (bytes == null || bytes.length != SIZE) { throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes"); } this.lowerPart = byteArrayToLong(bytes, 0); this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG); } /** * Constructs a new abstract ID. * * @param lowerPart the lower bytes of the ID * @param upperPart the higher bytes of the ID */ public AbstractID(long lowerPart, long upperPart) { this.lowerPart = lowerPart; this.upperPart = upperPart; } /** * Copy constructor: Creates a new abstract ID from the given one. * * @param id the abstract ID to copy */ public AbstractID(AbstractID id) { if (id == null) { throw new IllegalArgumentException("Id must not be null."); } this.lowerPart = id.lowerPart; this.upperPart = id.upperPart; } /** * Constructs a new random ID from a uniform distribution. */ public AbstractID() { this.lowerPart = RND.nextLong(); this.upperPart = RND.nextLong(); } // -------------------------------------------------------------------------------------------- /** * Gets the lower 64 bits of the ID. * * @return The lower 64 bits of the ID. */ public long getLowerPart() { return lowerPart; } /** * Gets the upper 64 bits of the ID. * * @return The upper 64 bits of the ID. */ public long getUpperPart() { return upperPart; } /** * Gets the bytes underlying this ID. * * @return The bytes underlying this ID. */ public byte[] getBytes() { byte[] bytes = new byte[SIZE]; longToByteArray(lowerPart, bytes, 0); longToByteArray(upperPart, bytes, SIZE_OF_LONG); return bytes; } // -------------------------------------------------------------------------------------------- // Standard Utilities // -------------------------------------------------------------------------------------------- @Override public boolean equals(Object obj) { if (obj == this) { return true; } else if (obj != null && obj.getClass() == getClass()) { AbstractID that = (AbstractID) obj; return that.lowerPart == this.lowerPart && that.upperPart == this.upperPart; } else { return false; } } @Override public int hashCode() { return ((int) this.lowerPart) ^ ((int) (this.lowerPart >>> 32)) ^ ((int) this.upperPart) ^ ((int) (this.upperPart >>> 32)); } @Override public String toString() { if (this.toString == null) { final byte[] ba = new byte[SIZE]; longToByteArray(this.lowerPart, ba, 0); longToByteArray(this.upperPart, ba, SIZE_OF_LONG); this.toString = StringUtils.byteToHexString(ba); } return this.toString; } @Override public int compareTo(AbstractID o) { int diff1 = Long.compare(this.upperPart, o.upperPart); int diff2 = Long.compare(this.lowerPart, o.lowerPart); return diff1 == 0 ? diff2 : diff1; } // -------------------------------------------------------------------------------------------- // Conversion Utilities // -------------------------------------------------------------------------------------------- /** * Converts the given byte array to a long. * * @param ba the byte array to be converted * @param offset the offset indicating at which byte inside the array the conversion shall begin * @return the long variable */ private static long byteArrayToLong(byte[] ba, int offset) { long l = 0; for (int i = 0; i < SIZE_OF_LONG; ++i) { l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3); } return l; } /** * Converts a long to a byte array. * * @param l the long variable to be converted * @param ba the byte array to store the result the of the conversion * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored */ private static void longToByteArray(long l, byte[] ba, int offset) { for (int i = 0; i < SIZE_OF_LONG; ++i) { final int shift = i << 3; // i * 8 ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift); } }}
- AbstractID由upperPart及lowerPart两个long类型的属性组成;无参构造器会使用Random.nextLong来生成upperPart及lowerPart;bytes参数的构造器则会从bytes中解析出lowerPart及upperPart;也可以直接使用lowerPart及upperPart参数的构造器直接指定
小结
- BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobService;PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File;TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法
- BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey;PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB;TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB
- AbstractID由upperPart及lowerPart两个long类型的属性组成;无参构造器会使用Random.nextLong来生成upperPart及lowerPart;bytes参数的构造器则会从bytes中解析出lowerPart及upperPart;也可以直接使用lowerPart及upperPart参数的构造器直接指定